Big Data Analytics using Apache Spark

CN6022 - Big Data Infrastructure and Manipulation

Authors

Jayrup Nakawala (u2613621)

Yogi Patel (u2536809)

Jasmi Alasapuri (u2571395)

1 Introduction

1.1 Project Overview

This project aims to demonstrate advanced Big Data manipulation and analytics using Apache Spark SQL. While the module provided a baseline dataset (Air Flight Status), we have elected to utilize the ESA Gaia Data Release 3 (DR3) for this analysis. Gaia is widely considered the largest and most complex astronomical catalog in human history, containing astrometric and photometric data for over 1.8 billion sources.

1.2 Dataset Selection and Justification

The core requirement for this coursework was to utilize a dataset that exceeds the volume and complexity of the provided sample. The Gaia DR3 dataset fits this criterion perfectly for three reasons:

  1. Volume: Even a 1% subset of Gaia (approximately 3 million rows) significantly exceeds the size of the standard flight dataset, requiring distributed computing techniques to process efficiently.
  2. Complexity: Unlike the flat structure of flight logs, astronomical data requires complex feature engineering (e.g., calculating Absolute Magnitude from Parallax).
  3. Scientific Relevance: This dataset allows for genuine astrophysical discovery including the identification of White Dwarfs and Binary Systems.

1.3 Data Acquisition Strategy: The “Two-Tier” Architecture

Ingesting the entire 1.8 billion row catalog is computationally infeasible for this project’s scope. Furthermore, a simple random sample introduces Malmquist Bias, where bright, distant stars drown out faint, local objects. To resolve this, we architected a Two-Tier Data Strategy, acquiring two distinct datasets via the ESA Archive (ADQL):

  • Dataset A: The “Galactic Survey” (Macro-Analysis): A random 1% sample of the entire sky (~3 million rows). This “Deep Field” dataset is used to map the broad structure of the Milky Way and analyze general stellar demographics, breadth over precision
  • Dataset B: The “Local Bubble” (Micro-Physics): A volume-limited sample of all stars within 100 parsecs (\(distance \le 100pc\)). This high-precision dataset eliminates distance-related noise, allowing us to detect faint objects like White Dwarfs that would otherwise be invisible, precision over breadth.

1.3.1 Data Schema and Key Columns

The following columns from the Gaia dataset will be used in our analysis:

  • source_id: Unique identifier for each star. (64-bit Integer)
  • ra: Right Ascension (celestial longitude). (Double Precision)
  • dec: Declination (celestial latitude). (Double Precision)
  • parallax: Parallax in milliarcseconds, used to calculate distance (\(d = 1/p\)). (Double Precision)
  • parallax_error: The uncertainty in the parallax measurement. (Single Precision)
  • pmra: Proper motion in the direction of Right Ascension. (Double Precision)
  • pmdec: Proper motion in the direction of Declination. (Double Precision)
  • phot_g_mean_mag: Mean apparent magnitude in the G-band (a measure of brightness as seen from Earth). (Single Precision)
  • bp_rp: The blue-red color index, a proxy for the star’s surface temperature. (Single Precision)
  • teff_gspphot: Effective temperature of the star’s photosphere, derived from photometry. (Single Precision)

1.4 Team Structure and Objectives

The analysis is divided into three distinct workstreams, each focusing on a different aspect of the data:

  • Jasmi (Stellar Demographics): Focuses on classifying star populations (H-R Diagram) and identifying high-velocity outliers using the Galactic Survey.
  • Yogi (Galactic Structure): detailed mapping of the Milky Way’s density and analysis of measurement error rates across the sky.
  • Jayrup (Exotic Star Hunting): Utilizes the high-precision “Local Bubble” and “Galactic Survey” data to detect rare stellar remnants and gravitationally bound binary star systems.

1.5 Understanding the Data

1.5.1 Installing dependencies

Code
!pip install pyspark astroquery pandas pyarrow seaborn matplotlib --quiet
zsh:1: command not found: pip

1.5.2 Downloading the Datasets

Code
from astroquery.gaia import Gaia
import os

output_dir = "../data"
os.makedirs(output_dir, exist_ok=True)

def save_strict_parquet(results, filename):
    """
    Converts Astropy Table to Pandas with strict Gaia Data Model types.
    """
    df = results.to_pandas()
    
    # 1. Enforce Source_ID as 64-bit Integer (Long)
    df['source_id'] = df['source_id'].astype('int64')

    # 2. Enforce Double Precision (float64) for Angles/Velocity
    doubles = ['ra', 'dec', 'parallax', 'pmra', 'pmdec']
    for col in doubles:
        if col in df.columns:
            df[col] = df[col].astype('float64')

    # 3. Enforce Single Precision (float32) for Errors/Magnitudes
    # This saves 50% RAM on these columns vs standard floats.
    floats = ['parallax_error', 'bp_rp', 'phot_g_mean_mag','teff_gspphot']
    for col in floats:
        if col in df.columns:
            df[col] = df[col].astype('float32')
            
    # Save
    print(f">> Saving {len(df)} rows to {filename}...")
    df.to_parquet(filename, index=False)


# --- JOB 1: SURVEY ---
survey_file = os.path.join(output_dir, "gaia_survey.parquet")
if not os.path.exists(survey_file):
    print(">> Downloading Survey...")
    q = """
    SELECT source_id, ra, dec, parallax, parallax_error, pmra, pmdec, 
           phot_g_mean_mag, bp_rp, teff_gspphot
    FROM gaiadr3.gaia_source
    WHERE parallax > 0 AND phot_g_mean_mag < 19 AND random_index < 3000000
    """
    job = Gaia.launch_job_async(q)
    save_strict_parquet(job.get_results(), survey_file)
else:
    print(">> Survey already downloaded.")

# --- JOB 2: LOCAL BUBBLE ---
local_file = os.path.join(output_dir, "gaia_100pc.parquet")
if not os.path.exists(local_file):
    print(">> Downloading Local Bubble...")
    q = """
    SELECT source_id, ra, dec, parallax, parallax_error, pmra, pmdec, 
           phot_g_mean_mag, bp_rp, teff_gspphot
    FROM gaiadr3.gaia_source
    WHERE parallax >= 10 AND parallax_over_error > 5
    """
    job = Gaia.launch_job_async(q)
    save_strict_parquet(job.get_results(), local_file)
else:
    print(">> Local Bubble already downloaded.")

print(">> Done.")
>> Survey already downloaded.
>> Local Bubble already downloaded.
>> Done.

1.5.3 Exploring the Datasets

Code
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

# Initialize Spark
spark = SparkSession.builder \
    .appName("Gaia_Data_Exploration") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Load Datasets
survey_path = "../data/gaia_survey.parquet"
local_path = "../data/gaia_100pc.parquet"

df_survey = spark.read.parquet(survey_path)
df_local = spark.read.parquet(local_path)

print(">>> DATASET 1: GALACTIC SURVEY (Macro)")
print(f"Total Rows: {df_survey.count():,}")
df_survey.printSchema()

print("\n>>> DATASET 2: LOCAL BUBBLE (Micro)")
print(f"Total Rows: {df_local.count():,}")
df_local.printSchema()

# ====================================================
# 1. PHYSICAL COMPARISON
# ====================================================

print("\n>>> STATISTICAL COMPARISON: PARALLAX (Distance)")
print("Note: Distance (pc) is approx 1000 / parallax.")

print("-- Survey Dataset Stats --")
df_survey.select("parallax", "phot_g_mean_mag", "pmra").describe().show()

print("-- Local Bubble Stats --")
df_local.select("parallax", "phot_g_mean_mag", "pmra").describe().show()

# ====================================================
# 2. QUALITY CHECK (Null Analysis)
# ====================================================

def count_nulls(df):
    return df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

# We only check critical columns
from pyspark.sql.functions import when
cols_to_check = ["parallax", "pmra", "teff_gspphot", "bp_rp"]

print("\n>>> NULL VALUE ANALYSIS (Survey Dataset)")
df_survey.select([count(when(col(c).isNull(), c)).alias(c) for c in cols_to_check]).show()

print("\n>>> NULL VALUE ANALYSIS (Local Dataset)")
df_local.select([count(when(col(c).isNull(), c)).alias(c) for c in cols_to_check]).show()
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[Stage 0:>                                                          (0 + 1) / 1]                                                                                
>>> DATASET 1: GALACTIC SURVEY (Macro)
Total Rows: 844,868
root
 |-- source_id: long (nullable = true)
 |-- ra: double (nullable = true)
 |-- dec: double (nullable = true)
 |-- parallax: double (nullable = true)
 |-- parallax_error: float (nullable = true)
 |-- pmra: double (nullable = true)
 |-- pmdec: double (nullable = true)
 |-- phot_g_mean_mag: float (nullable = true)
 |-- bp_rp: float (nullable = true)
 |-- teff_gspphot: float (nullable = true)


>>> DATASET 2: LOCAL BUBBLE (Micro)
Total Rows: 541,958
root
 |-- source_id: long (nullable = true)
 |-- ra: double (nullable = true)
 |-- dec: double (nullable = true)
 |-- parallax: double (nullable = true)
 |-- parallax_error: float (nullable = true)
 |-- pmra: double (nullable = true)
 |-- pmdec: double (nullable = true)
 |-- phot_g_mean_mag: float (nullable = true)
 |-- bp_rp: float (nullable = true)
 |-- teff_gspphot: float (nullable = true)


>>> STATISTICAL COMPARISON: PARALLAX (Distance)
Note: Distance (pc) is approx 1000 / parallax.
-- Survey Dataset Stats --
[Stage 8:====================================================>    (13 + 1) / 14]                                                                                
+-------+--------------------+------------------+-------------------+
|summary|            parallax|   phot_g_mean_mag|               pmra|
+-------+--------------------+------------------+-------------------+
|  count|              844868|            844868|             844868|
|   mean|    0.55071253992443|17.404869147894882| -2.373963360005405|
| stddev|  0.7697707648724794|1.4361723326971574| 7.0091972840050865|
|    min|1.868718209532133E-7|         3.0238004|-377.74180694687686|
|    max|   75.56887569382528|         18.999998|  649.0319386167508|
+-------+--------------------+------------------+-------------------+

-- Local Bubble Stats --
+-------+------------------+-----------------+------------------+
|summary|          parallax|  phot_g_mean_mag|              pmra|
+-------+------------------+-----------------+------------------+
|  count|            541958|           540859|            541958|
|   mean|14.283010896409225|16.94108547991132|-3.085397329690918|
| stddev|7.0432588218793475|3.605457797042056|  94.7899342646246|
|    min|10.000005410606198|        1.9425238|-4406.469178827325|
|    max| 768.0665391873573|        21.289928| 6765.995136250774|
+-------+------------------+-----------------+------------------+


>>> NULL VALUE ANALYSIS (Survey Dataset)
+--------+----+------------+-----+
|parallax|pmra|teff_gspphot|bp_rp|
+--------+----+------------+-----+
|       0|   0|      125194|23080|
+--------+----+------------+-----+


>>> NULL VALUE ANALYSIS (Local Dataset)
+--------+----+------------+-----+
|parallax|pmra|teff_gspphot|bp_rp|
+--------+----+------------+-----+
|       0|   0|      423880|68171|
+--------+----+------------+-----+

2 Queries

2.1 Jasmi

2.1.1 The H-R Diagram

To prepare a high-quality, filtered dataset by calculating the intrinsic luminosity (Absolute Magnitude, \(M_G\)) for every star in the survey, providing the necessary data for the H-R Diagram’s Y-axis.

2.1.1.1 Methodology

This phase uses a Direct Calculation and Strict Quality Filtering approach in Spark SQL. The complexity lies not in aggregation, but in applying the critical astronomical transformation formula and enforcing a high Signal-to-Noise Ratio (SNR) on the distance data before transferring the results to Python for advanced plotting.

2.1.1.2 Parameter Justification

To ensure the resulting H-R Diagram is scientifically precise—preventing the smearing of the Main Sequence—strict parameters were justified and applied:

  • Absolute Magnitude (\(M_G\)) Calculation The luminosity is calculated using the standard formula :

    \[ M_G = m - 5 \log_{10}(d) + 5 \]

    where \(m\) is the apparent magnitude (phot_g_mean_mag) and \(d\) is the distance in parsecs (derived from \(\frac{1000}{\text{parallax}}\)). Using \(M_G\) is mandatory because the H-R diagram plots luminosity, which is independent of Earth’s viewing distance.

  • Colour Index (bp_rp) This direct photometric measurement provides the most reliable measure of the star’s effective surface temperature (the X-axis of the H-R Diagram). It is directly selected for the plot without modification.

  • Quality Cut (Parallax / Parallax Error \(\geq 5.0\)) The signal-to-noise ratio (SNR) of the parallax is calculated dynamically and filtered to values greater than 5.0. This Critical SNR Cut is the most important filter: it ensures that only stars with highly reliable distance estimates are processed. Stars with poor parallax data would otherwise cause the Main Sequence to appear thick and indistinct, masking key stellar populations.

2.1.1.3 The Query Logic

The analysis was performed using a single, efficient query that focused purely on mathematical transformation and data exclusion:

  • Derivation The Absolute Magnitude (\(\text{abs\_mag}\)) was calculated directly in the SELECT clause using the \(\log_{10}\) function applied to the parallax.

  • Sanitisation The WHERE clause performed strict filtering to remove all low-quality and non-physical measurements:

    • Exclusion of stars with invalid distance (parallax > 0).
    • Exclusion of stars missing essential photometry (bp_rp IS NOT NULL and phot\_g\_mean\_mag IS NOT NULL).
    • Exclusion of all data points failing the SNR \(\geq 5.0\) standard.
  • Data Transfer The final output was checked for total row count and then converted from a high-performance Spark DataFrame (raw_df) to a standard Pandas DataFrame (pdf). This transfer is necessary to facilitate the advanced plotting capabilities of Matplotlib and NumPy.

Code
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, log10, lit, count, when, sqrt, pow, percentile_approx
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import os

import pandas as pd
import numpy as np

import seaborn as sns
import matplotlib.pyplot as plt

from matplotlib import colors
spark = SparkSession.builder \
    .appName("Gaia_HR_Analysis") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Loading Data
df_survey = spark.read.parquet("../data/gaia_survey.parquet")
# Creating a temp view
df_survey.createOrReplaceTempView("gaia_survey")

#Query
query_raw = """
SELECT 
    source_id,
    bp_rp,
    -- Calculate Absolute Magnitude (M) directly in Spark
    -- Formula: Apparent Mag - 5 * log10(Distance) + 5
    (phot_g_mean_mag - 5 * LOG10(1000 / parallax) + 5) AS abs_mag
FROM gaia_survey
WHERE 
    parallax > 0 
    AND (parallax / parallax_error) >= 5.0  -- For better precision 
    AND bp_rp IS NOT NULL 
    AND phot_g_mean_mag IS NOT NULL
"""

print(">>> RUNNING QUERY...")
raw_df = spark.sql(query_raw)

# Check how many stars to plot
print(f"Total stars to plot: {raw_df.count():,}")

# Convert to Pandas
pdf = raw_df.toPandas()
print("Data loaded.")

# checking the top 5 rows
print("\n------------------------------\nPrinting first five rows\n------------------------------")
pdf.head()
25/12/16 22:00:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
>>> RUNNING QUERY...
Total stars to plot: 296,983
[Stage 24:====================================================>   (13 + 1) / 14]                                                                                
Data loaded.

------------------------------
Printing first five rows
------------------------------
source_id bp_rp abs_mag
0 242682832483968 2.051584 9.188913
1 5474605834008192 0.437714 12.840361
2 6591404705437056 1.121918 6.156537
3 6640539131235840 2.003047 6.955578
4 7473968945265152 1.275984 5.436462

2.1.1.4 Visualisation Logic

The final visualization step—performed in Python using the data prepared by this query—is a significant improvement over simple heatmapping:

  • Advanced Density Scatter: Instead of using fixed SQL bins, the data is plotted using a density-mapped scatter plot.
  • NumPy Density Trick: NumPy’s histogram2d function is used to assign a density score to every individual point.
  • Visual Enhancement: The data is sorted by this density score and plotted with a logarithmic colour scale (cmap='inferno'). This technique ensures that the densest region (the Main Sequence core) is plotted last and remains sharp, while also making faint, low-density features (like the White Dwarf sequence) clearly visible.
Code
nbins = 300
k = colors.LogNorm() 

x = pdf['bp_rp'].values
y = pdf['abs_mag'].values

plt.style.use('dark_background') # Dark background makes the colors pop

# Create the grid
H, xedges, yedges = np.histogram2d(x, y, bins=nbins)

# Map every star to its bin
x_inds = np.clip(np.digitize(x, xedges) - 1, 0, nbins - 1)
y_inds = np.clip(np.digitize(y, yedges) - 1, 0, nbins - 1)

# Assign density value to each star
z = H[x_inds, y_inds]

# 3. Sort for Sharpness
# Sort the data so the densest (brightest) regions are plotted LAST (on top)
idx = z.argsort()
x, y, z = x[idx], y[idx], z[idx]

# 4. Plot: Use 'magma' colormap for clearer density progression
plt.figure(figsize=(8, 10))

# 'magma' or 'plasma' provide excellent contrast for astronomical density plots
plt.scatter(x, y, c=z, s=0.5, cmap='magma', norm=k, alpha=1.0, edgecolors='none')

# 5. Astronomy Polish
plt.title("Gaia DR3 Hertzsprung-Russell Diagram", fontsize=16)
plt.xlabel("Color Index (BP-RP)", fontsize=12)
plt.ylabel("Absolute Magnitude (M)", fontsize=12)

# Invert Y-Axis (Bright stars go at the top)
plt.ylim(17, -5)
plt.xlim(-1, 5)

# Add Colorbar
cbar = plt.colorbar()
cbar.set_label('Star Density', rotation=270, labelpad=20)

# Annotations (FIXED: Changed colour and added bolding for prominence)
plt.text(0.5, 14, 'White Dwarfs', color='white', fontsize=12, fontweight='bold')
plt.text(1.5, 4, 'Main Sequence', color='gold', fontsize=12, rotation=-45, fontweight='bold')
plt.text(2.5, 0, 'Giants', color='orange', fontsize=12, fontweight='bold')

plt.tight_layout()
plt.show()

Advanced Density Scatter

2.1.2 Stellar Population Census & Astrophysical Classification

To classify the stellar population into major spectral types (O through M) and to distinguish between Main Sequence stars (dwarfs) and evolved stars (giants).

2.1.2.1 Methodology

This query uses a Colour–Magnitude approach rather than relying on raw temperature estimates. This method provides more reliable classification, particularly for faint sources where temperature estimations can become inaccurate.

2.1.2.2 Dataset

Gaia DR3 Source (Random 3 Million Object Sample)

2.1.2.3 Parameter Justification

To ensure scientific accuracy and robustness in the classification process, the following parameters were selected:

  • Colour Index (bp_rp)
    The colour index is calculated as the difference between the Blue (bp) and Red (rp) photometric bands. This metric is used instead of the algorithmically derived effective temperature (teff_gspphot). Temperature estimates tend to degrade for faint stars, often producing artificially high temperature values. In contrast, the colour index is a direct photometric measurement and provides a more reliable separation of stellar spectral types.

  • Quality Cut (Parallax / Parallax Error > 5)
    The signal-to-noise ratio of the parallax is calculated dynamically and filtered to values greater than 5. This ensures that only stars with reliable distance measurements are included in the analysis. Applying this threshold reduces distance-related uncertainties and prevents the misclassification of dwarfs as giants due to parallax errors, commonly referred to as “ghost giants”.

  • Absolute Magnitude (\(M_G\))
    The intrinsic brightness of each star is calculated using the formula:
    [ M_G = m - 5 _{10}(d) + 5 ]
    where ( m ) is the apparent magnitude and ( d ) is the distance in parsecs. Absolute magnitude allows for a clear distinction between stellar evolutionary stages. Red giants are identified as intrinsically bright stars (( M_G < 3.5 )), while red dwarfs are significantly dimmer (( M_G > 3.5 )). This separation cannot be achieved using apparent magnitude alone.

Code
# 1. The Corrected Query (Calculates error ratio on the fly)
query = """
WITH PhysicalProperties AS (
    SELECT
        source_id,
        bp_rp,
        -- Calculate Absolute Magnitude (Mg)
        phot_g_mean_mag - 5 * LOG10(1000 / parallax) + 5 AS abs_mag_g,
        
        -- FIX: Calculate the Signal-to-Noise ratio manually
        (parallax / parallax_error) AS calculated_poe
    FROM gaia_survey
    WHERE parallax > 0 
      AND parallax_error > 0 -- Prevent divide-by-zero errors
),

FilteredStars AS (
    SELECT * FROM PhysicalProperties
    WHERE calculated_poe > 5 -- Quality Cut: Only keep reliable data
),

ClassifiedStars AS (
    SELECT
        -- 1. Spectral Classification (Colour)
        CASE
            WHEN bp_rp IS NULL THEN 'Unknown'
            WHEN bp_rp > 1.6 THEN 'Cool (M)'
            WHEN bp_rp > 0.9 THEN 'Warm (K)'
            WHEN bp_rp > 0.7 THEN 'Yellow (G)'
            WHEN bp_rp > 0.4 THEN 'Yel-Wht (F)'
            WHEN bp_rp > 0.0 THEN 'White (A)'
            ELSE 'Hot (O/B)'
        END AS spectral_type,

        -- 2. Luminosity Classification (Giant vs Dwarf)
        CASE
            WHEN abs_mag_g < 3.5 AND bp_rp > 0.7 THEN 'Giant'
            ELSE 'Main Sequence (Dwarf)'
        END AS luminosity_class
    FROM FilteredStars
)

SELECT
    spectral_type,
    COUNT(CASE WHEN luminosity_class = 'Main Sequence (Dwarf)' THEN 1 END) AS dwarf_count,
    COUNT(CASE WHEN luminosity_class = 'Giant' THEN 1 END) AS giant_count,
    COUNT(*) AS total_count,
    ROUND(100.0 * COUNT(CASE WHEN luminosity_class = 'Giant' THEN 1 END) / COUNT(*), 1) AS giant_percentage
FROM ClassifiedStars
WHERE spectral_type != 'Unknown'
GROUP BY spectral_type
ORDER BY 
    CASE spectral_type
        WHEN 'Hot (O/B)' THEN 1
        WHEN 'White (A)' THEN 2
        WHEN 'Yel-Wht (F)' THEN 3
        WHEN 'Yellow (G)' THEN 4
        WHEN 'Warm (K)' THEN 5
        WHEN 'Cool (M)' THEN 6
        ELSE 7
    END
"""

# 2. Execute
df_results = spark.sql(query)

# 3. Show Results
print(">> Census Results: Dwarfs vs. Giants (calculated_poe fix)")
df_results.show(truncate=False)
>> Census Results: Dwarfs vs. Giants (calculated_poe fix)
[Stage 25:====================================================>   (13 + 1) / 14]
+-------------+-----------+-----------+-----------+----------------+
|spectral_type|dwarf_count|giant_count|total_count|giant_percentage|
+-------------+-----------+-----------+-----------+----------------+
|Hot (O/B)    |94         |0          |94         |0.0             |
|White (A)    |745        |0          |745        |0.0             |
|Yel-Wht (F)  |4536       |0          |4536       |0.0             |
|Yellow (G)   |21349      |6598       |27947      |23.6            |
|Warm (K)     |143669     |21220      |164889     |12.9            |
|Cool (M)     |84014      |14758      |98772      |14.9            |
+-------------+-----------+-----------+-----------+----------------+
                                                                                

2.1.2.4 Visualisation Logic

To clearly present the results of the analysis, two visualisations were produced:

  • Population Mix (Stacked Bar Chart)
    A 100% stacked bar chart was used to illustrate the proportion of giants versus dwarfs within each spectral type. This visualisation highlights the evolutionary composition of the stellar population and allows for direct comparison across spectral classes.
Code
# 1. Convert your Spark results to Pandas
# (Assuming df_results is the dataframe from your last query)
pdf = df_results.toPandas()

# 2. Setup the Data for Plotting
# We need to sort the data from Hot to Cool for the X-axis
sort_order = {
    'Hot (O/B)': 0, 'White (A)': 1, 'Yel-Wht (F)': 2, 
    'Yellow (G)': 3, 'Warm (K)': 4, 'Cool (M)': 5
}
pdf['sort_id'] = pdf['spectral_type'].map(sort_order)
pdf = pdf.sort_values('sort_id')

# Calculate Percentages for the Stacked Bar
# (We re-calculate here to ensure they sum to exactly 100 for the plot)
pdf['dwarf_pct'] = (pdf['dwarf_count'] / pdf['total_count']) * 100
pdf['giant_pct'] = (pdf['giant_count'] / pdf['total_count']) * 100

# 3. Create the Plot
fig, ax = plt.subplots(figsize=(10, 6))

# Plot Dwarfs (Bottom Bar)
p1 = ax.bar(pdf['spectral_type'], pdf['dwarf_pct'], label='Main Sequence (Dwarfs)', 
            color='#1f77b4', edgecolor='black', alpha=0.9)

# Plot Giants (Top Bar)
p2 = ax.bar(pdf['spectral_type'], pdf['giant_pct'], bottom=pdf['dwarf_pct'], 
            label='Giants (Evolved)', color='#d62728', edgecolor='black', alpha=0.9)

# 4. Styling
ax.set_title('Stellar Population Mix: Dwarfs vs. Giants (High-Quality Subset)', fontsize=16)
ax.set_ylabel('Percentage of Population (%)', fontsize=12)
ax.set_xlabel('Spectral Type', fontsize=12)
ax.set_ylim(0, 100)
ax.legend(loc='upper left', frameon=True)
ax.grid(axis='y', linestyle='--', alpha=0.4)

# 5. Add Labels
# Label the Giants if they exist
for i, (idx, row) in enumerate(pdf.iterrows()):
    if row['giant_pct'] > 1:
        ax.text(i, row['dwarf_pct'] + row['giant_pct']/2, f"{row['giant_pct']:.1f}%", 
                ha='center', va='center', color='white', fontweight='bold', fontsize=11)
        
    # Label the Dwarfs
    if row['dwarf_pct'] > 5:
        ax.text(i, row['dwarf_pct']/2, f"{row['dwarf_pct']:.1f}%", 
                ha='center', va='center', color='white', fontweight='bold', fontsize=11)

plt.style.use('dark_background') # Dark background makes the colors pop
plt.tight_layout()
plt.show()

Population Mix (Stacked Bar Chart)

2.1.2.5 Interpretation of Results

  • O, B, A, and F Spectral Types
    The analysis identified 0.0% giant stars within these spectral classes. This result is consistent with stellar evolution theory, as massive, blue stars evolve rapidly and do not remain in the blue region of the spectrum once they leave the Main Sequence.

  • G and K Spectral Types
    A substantial giant population was observed within these classes, accounting for approximately 13–24% of the sample. This correctly traces the Red Giant Branch and distinguishes evolved stars, such as Arcturus, from nearby solar-type dwarfs.

  • M Spectral Type
    The M-type population was overwhelmingly dominated by dwarfs. This reflects the high abundance of red dwarfs in the galaxy and the relative rarity of true M-type giants in a randomly selected stellar sample.

2.1.3 High-Velocity Outlier Detection (Kinematic Analysis)

To identify and flag the top 1% of stars in the df_local dataset exhibiting the highest Total Proper Motion (apparent speed across the sky). These stars are often key kinematic outliers, such as halo stars or nearby high-velocity dwarfs.

  • Columns Needed: pmra (Proper Motion Right Ascension), pmdec (Proper Motion Declination).

  • SQL Complexity: Simplified and Optimised. The executed code avoids the slow, complex nested SQL window function (PERCENT_RANK()) proposed in the original plan, replacing it with a direct, single-action calculation in Spark.

    1. Mathematical Transformation: Total Proper Motion (\(\mu\)) is calculated using the Pythagorean theorem: \(\mu = \sqrt{\mu_{\alpha}^2 + \mu_{\delta}^2}\).
      • Code: sqrt(pow(col("pmra"), 2) + pow(col("pmdec"), 2))
    2. Threshold Calculation: The complexity is offloaded to the optimized Spark function percentile_approx(). This directly computes the 99th percentile proper motion value (pm_threshold) in a single, fast aggregation.
      • Code: df_motion.agg(percentile_approx("total_pm", lit(0.99)))
    3. Flagging (Simple Filter): The final SQL query becomes a simple filter applied to the pm_threshold, avoiding a subquery and expensive ranking.
      • Code: CASE WHEN total_pm >= {pm_threshold} THEN 1 ELSE 0 END AS is_high_pm
Code
# ====================================================
# 1. Calculate Total Proper Motion & Find the Threshold
# ====================================================

# Calculate total proper motion (total_pm = sqrt(pmra^2 + pmdec^2))
df_motion = df_local.withColumn(
    "total_pm", 
    sqrt(pow(col("pmra"), 2) + pow(col("pmdec"), 2))
)

# Find the 99th percentile (Top 1%) proper motion value
# This value will be our threshold (e.g., 100 mas/yr)
pm_threshold = df_motion.agg(
    percentile_approx("total_pm", lit(0.99)).alias("threshold_value")
).collect()[0]["threshold_value"]

print(f">>> Calculated 99th Percentile Proper Motion Threshold: {pm_threshold:.2f} mas/yr")

# ====================================================
# 2. SQL Query: Prepare Data for Plotting
# ====================================================

# Create a temporary view for the motion-enhanced DataFrame
df_motion.createOrReplaceTempView("gaia_motion")

# The query selects necessary fields and flags the fast-moving stars
plot_query_with_motion = f"""
SELECT 
    source_id,
    bp_rp,
    -- Calculate Absolute Magnitude (Mg)
    (phot_g_mean_mag - 5 * LOG10(1000 / parallax) + 5) AS abs_mag_g,
    total_pm,
    
    -- Flag if the star is in the top 1% of motion
    CASE 
        WHEN total_pm >= {pm_threshold} THEN 1 
        ELSE 0 
    END AS is_high_pm
    
FROM gaia_motion
WHERE parallax > 0 
  AND (parallax / parallax_error) > 5  -- The Quality Cut
  AND phot_g_mean_mag IS NOT NULL 
"""

# Execute the query
df_plot_motion = spark.sql(plot_query_with_motion)

# Convert to Pandas for plotting
pdf_motion = df_plot_motion.toPandas()

print(f"Total stars prepared for plotting: {len(pdf_motion):,}")
print(f"Total high-PM stars flagged: {pdf_motion['is_high_pm'].sum():,}")
[Stage 36:===================================================>      (8 + 1) / 9]                                                                                
>>> Calculated 99th Percentile Proper Motion Threshold: 450.85 mas/yr
Total stars prepared for plotting: 540,859
Total high-PM stars flagged: 5,435

2.1.3.1 Visualization

A Layered Scatter Plot on the H-R Diagram was used (as seen in the executed code). This is a highly effective scientific visualisation that goes beyond the simple table proposed in the original plan. It plots: * Background: The bulk (slow-moving) population (purple). * Foreground: The high-velocity outliers (is_high_pm = 1) in a distinct colour (cyan), showing their position relative to the main stellar sequences.

Code
import matplotlib.pyplot as plt

# Filter the data into two subsets
pdf_slow = pdf_motion[pdf_motion['is_high_pm'] == 0]
pdf_fast = pdf_motion[pdf_motion['is_high_pm'] == 1]

# 1. Create the Plot Canvas
plt.figure(figsize=(8, 10))
plt.style.use('dark_background')

# 2. Plot the Bulk Population (Slow/Main Sequence)
# Use a high alpha (low transparency) colour to show the main density
plt.scatter(
    pdf_slow['bp_rp'], 
    pdf_slow['abs_mag_g'], 
    c='purple',         # Base colour
    s=0.5,              # Small size
    alpha=0.2,          # Very transparent to show density variation
    edgecolors='none', 
    label='Bulk Population (Low PM)'
)

# 3. OVERLAY the High-Velocity Stars
# Use a distinct, bright colour and larger size
plt.scatter(
    pdf_fast['bp_rp'], 
    pdf_fast['abs_mag_g'], 
    c='cyan',           # Stand-out colour
    s=5,                # Much larger size
    alpha=1.0,          # Fully opaque
    edgecolors='none',
    label=f'High-Velocity Outliers (Top 1% > {pm_threshold:.0f} mas/yr)'
)

# 4. Polish and Axes
plt.gca().invert_yaxis()  # Brighter stars (lower mag) go at the top
plt.title('HR Diagram Highlighting Kinematic Outliers (Top 1% Proper Motion)', fontsize=16)
plt.xlabel('Colour Index (BP-RP)')
plt.ylabel('Absolute Magnitude (M)')
plt.xlim(-1, 5)
plt.ylim(17, -5)
plt.legend(loc='upper right')
plt.grid(alpha=0.1)

plt.style.use('dark_background') # Dark background makes the colors pop
plt.show()
spark.stop()

Layered Scatter Plot on the H-R Diagram

2.2 Yogi

2.2.1 Galactic Plane vs Halo

Code
from pyspark.sql import SparkSession
#from pyspark.sql.functions import * 
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# Initialize Spark
spark = SparkSession.builder \
.appName("Member2_Galactic_Structure") \
.config("spark.driver.memory", "4g") \
.getOrCreate()

# 1. Load the Data
parquet_path = "../data/gaia_survey.parquet"

df = spark.read.parquet(parquet_path)


# describe the gaia_survey data
df.describe().show()
25/12/16 22:00:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 1:====================================================>    (13 + 1) / 14]
+-------+--------------------+--------------------+-------------------+--------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+
|summary|           source_id|                  ra|                dec|            parallax|     parallax_error|               pmra|              pmdec|   phot_g_mean_mag|             bp_rp|      teff_gspphot|
+-------+--------------------+--------------------+-------------------+--------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+
|  count|              844868|              844868|             844868|              844868|             844868|             844868|             844868|            844868|            821788|            719674|
|   mean|4.208655544660601...|   220.3738280138546|-14.661386665029692|    0.55071253992443|0.12661974095717216| -2.373963360005405|-3.0263002891849347|17.404869147894882|1.5300547396294117|  4878.39501996751|
| stddev|1.770170248935241...|   84.67372412021707|  38.84441601829229|  0.7697707648724794|0.08883984995128799| 7.0091972840050865|  6.990431755441395|1.4361723326971574|0.6075873126388648|1028.3654349007377|
|    min|      42159399217024|2.620729015336566...| -89.92247730288476|1.868718209532133E-7|        0.007789557|-377.74180694687686| -565.3362997837297|         3.0238004|       -0.54805183|          2739.997|
|    max| 6917515734718201472|   359.9982847741013|  89.77689025849239|   75.56887569382528|           1.519277|  649.0319386167508|  340.6398385516719|         18.999998|          7.822592|          37489.88|
+-------+--------------------+--------------------+-------------------+--------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+
                                                                                
Code
# 2. Register as a Temp View 
# This allows us to use SQL commands on the dataframe 'df'
df.createOrReplaceTempView("gaia_source")

print(">>> Executing Query 2.1: Galactic Plane vs Halo...")

# 3. The Spark SQL Query
# We use a CTE (Common Table Expression) named 'CalculatedData' to do the math first,
# and then the main query does the aggregation. 
query = """
    WITH CalculatedData AS (
        SELECT 
            source_id,
            -- Calculate Total Motion (Hypotenuse of pmra and pmdec)
            SQRT(POW(pmra, 2) + POW(pmdec, 2)) AS total_motion,
            
            -- Define Region using the 'DEC Proxy' method
            CASE 
                WHEN ABS(dec) < 15 THEN 'Galactic Plane' 
                ELSE 'Galactic Halo' 
            END AS region
        FROM gaia_source
    )
    
    -- Final Aggregation
    SELECT 
        region,
        ROUND(AVG(total_motion), 2) AS avg_speed,
        ROUND(STDDEV(total_motion), 2) AS stddev_speed,
        COUNT(*) AS star_count
    FROM CalculatedData
    GROUP BY region
    ORDER BY avg_speed DESC
"""

# 4. Run the query
sql_results = spark.sql(query)

# 5. Show results
sql_results.show()
>>> Executing Query 2.1: Galactic Plane vs Halo...
+--------------+---------+------------+----------+
|        region|avg_speed|stddev_speed|star_count|
+--------------+---------+------------+----------+
| Galactic Halo|     7.03|        7.73|    691873|
|Galactic Plane|     6.96|        8.96|    152995|
+--------------+---------+------------+----------+

2.2.1.1 Analysis

The objective of this query was to identify stellar kinematics by comparing the proper motion of stars in the dense Galactic Disk and Galactic Halo.

  • Metric: We calculated the “Total Proper Motion” (\(\mu\)) for each star by combining its two components: \(\mu = \sqrt{\texttt{pmra}^2 + \texttt{pmdec}^2}\).
  • Segmentation: Due to dataset constraints, we utilized Declination (dec) as a proxy for Galactic Latitude. We defined the “Galactic Plane” as the equatorial band (\(|dec| < 15^{\circ}\)) and the “Halo” as the high-latitude regions (\(|dec| \ge 15^{\circ}\)).

Finding A: The Problem “Missed Galaxy”(Star Count)

  • the simple Reason is that the Milky Way is tilted.
  • Our query only sees the flat strip across the middle (dec between -15 and 15 degrees).Because of that our “Flat strip” missed the biggest parts of the galaxy.

Finding B: The Velocity Variation

  • We Expected the “Halo” to have a higher velocity than the “Plane”.Instead the “Disk” have the biggest range (8.96 vs 7.73).

The simple reason is the distance changes how speed looks.

  • The Disk: Contains many stars that are close to Earth. Because they are close, their speeds look dramatic and varied to our camera.

  • The Halo: Stars are incredibly far away. Even if they are moving fast, their distance makes them all appear to move slowly and steadily, leading to a “lower” measurement.

2.2.1.2 Visualization

Code
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# 1. Take a random 1% sample (Crucial for performance)
pdf_subset = df.select("ra", "dec").sample(fraction=0.01, seed=42).toPandas()

# 2. Re-create the Region logic (using numpy to avoid errors)
pdf_subset['Region'] = pdf_subset['dec'].apply(lambda x: 'Galactic Plane' if np.abs(x) < 15 else 'Galactic Halo')

# 3. SET THE THEME: Dark Background for a "Space" look
plt.style.use('dark_background')
plt.figure(figsize=(12, 7))

# 4. Plot the "Halo" (Background Stars)
# We plot these first in cool blue so they look "distant"
sns.scatterplot(
    data=pdf_subset[pdf_subset['Region'] == 'Galactic Halo'], 
    x='ra', 
    y='dec', 
    color='cornflowerblue', 
    s=5,           # Small dots
    alpha=0.3,     # Faint transparency
    edgecolor=None,
    label='Galactic Halo (Sparse)'
)

# 5. Plot the "Plane" (The Disk)
# We plot these on top in bright Gold to represent the dense star field
sns.scatterplot(
    data=pdf_subset[pdf_subset['Region'] == 'Galactic Plane'], 
    x='ra', 
    y='dec', 
    color='#FFD700', # Gold color
    s=10,            # Slightly larger dots
    alpha=0.4,       # Brighter
    edgecolor=None,
    label='Galactic Plane (Dense)'
)

# Draw the cut-off lines
plt.axhline(15, color='white', linestyle='--', linewidth=1, alpha=0.5)
plt.axhline(-15, color='white', linestyle='--', linewidth=1, alpha=0.5)

# Add text labels on the graph
plt.text(180, 0, "Milky Way Disk\n(High Density)", color='orangered', 
         ha='center', va='center', fontsize=12, )

plt.text(180, 60, "Galactic Halo\n(Low Density)", color='cornflowerblue', 
         ha='center', va='center', fontsize=10)

# 7. Final Polish
plt.title("Spatial Structure: The 'Flat' Disk vs. The 'Round' Halo", fontsize=14, color='white')
plt.xlabel("Right Ascension (Longitude)", fontsize=12)
plt.ylabel("Declination (Latitude)", fontsize=12)
plt.legend(loc='upper right', facecolor='black', edgecolor='white')
plt.grid(False) # Turn off grid to look more like space

# Astronomers view the sky looking "up", so we invert the X-axis
plt.gca().invert_xaxis()

plt.show()

Galactic Plane vs Halo

2.2.2 Star Density Sky Map

Code
print(">>> Executing Query 2.2: Star Density Sky Map")

query_density = """
    WITH DensityBins AS (
        SELECT 
            -- 1. Spatial Binning (The 'Grid')
            -- We divide by 2, floor it to remove decimals, then multiply by 2
            -- This snaps every star to the nearest even number grid line (0, 2, 4...)
            FLOOR(ra / 2) * 2 AS ra_bin,
            FLOOR(dec / 2) * 2 AS dec_bin,
            
            -- 2. Aggregation (Counting stars in that grid square)
            COUNT(*) AS star_count
        FROM gaia_source
        GROUP BY 1, 2  -- Group by the first two columns (ra_bin, dec_bin)
    ),
    
    RankedRegions AS (
        SELECT 
            *,
            -- Rank the bins from most populated (1) to least populated
            RANK() OVER (ORDER BY star_count DESC) as density_rank
        FROM DensityBins
    )
    
    -- 4. Final Result: Top 5 Densest Regions
    SELECT * FROM RankedRegions 
    WHERE density_rank <= 5
"""

# 3. Run and Show
sql_density_results = spark.sql(query_density)
sql_density_results.show()
>>> Executing Query 2.2: Star Density Sky Map
25/12/16 22:01:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/16 22:01:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/16 22:01:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/16 22:01:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/16 22:01:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+-------+----------+------------+
|ra_bin|dec_bin|star_count|density_rank|
+------+-------+----------+------------+
|   270|    -30|      2135|           1|
|   272|    -30|      2096|           2|
|   268|    -30|      2036|           3|
|   272|    -28|      2011|           4|
|   274|    -28|      1762|           5|
+------+-------+----------+------------+
25/12/16 22:01:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/16 22:01:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

2.2.2.1 Analysis

The goal of this query was to create the “Stellar Density Map” to identify the most populated region in the sky.

  • Spatial Binning Strategy: We have utilized a qantization approach to divide the continuous sky into discrete grid. FLOOR(coordinate / 2) * 2 to both Right Ascension (ra) and Declination (dec), we grouped stars into \(2^{\circ} \times 2^{\circ}\) spatial bins.
  • Analytical Complexity: To Rank this density, we have incorporated the pyspark Window Function (RANK() OVER (ORDER BY star_count DESC)).

Critical Analysis:

  1. The results has the perfect validation of the binning algorithm and identified Galactic centre.
  2. Astronomical Validation: The coordinates returned (\(RA \approx 270^{\circ}\), \(Dec \approx -30^{\circ}\)) correspond precisely to the constellation Sagittarius.
  • The official coordinates of Sagittarius A* (the supermassive black hole at the center of the Milky Way)re \(RA \approx 266^{\circ}\), \(Dec \approx -29^{\circ}\).

Interpretation:

The high star count in these bins conform that confirm that we are looking through the galactic plate and the centre bulge.

2.2.2.2 Visualization

Code
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql.functions import col

# 1. Reuse the "DensityBins" logic from Query 2.2, but save it as a real Dataframe
density_bins = spark.sql("""
    SELECT 
        FLOOR(ra / 2) * 2 AS ra_bin, 
        FLOOR(dec / 2) * 2 AS dec_bin,
        COUNT(*) as local_density
    FROM gaia_source 
    GROUP BY 1, 2
""")

# We give every star a new property: "How many neighbors do I have?"
# We calculate the bins on the fly for the join
df_with_density = df.withColumn("ra_bin", F.floor(col("ra") / 2) * 2) \
                    .withColumn("dec_bin", F.floor(col("dec") / 2) * 2) \
                    .join(density_bins, ["ra_bin", "dec_bin"])

# This is enough to look like "every star" to the human eye without crashing.
pdf_visual = df_with_density.sample(fraction=0.05, seed=42).select("ra", "dec", "local_density").toPandas()

# 4. The "Glowing" Scatter Plot
plt.style.use('default')
plt.figure(figsize=(14, 8))

# We sort by density so the bright stars are plotted ON TOP of the dark ones
pdf_visual = pdf_visual.sort_values("local_density")

scatter = plt.scatter(
    pdf_visual['ra'], 
    pdf_visual['dec'], 
    c=pdf_visual['local_density'], # Color by density
    cmap='magma',                # Magma/Inferno = glowing fire effect
    s=2,                           # Tiny dots 
    alpha=0.8,                     # High opacity to make them pop
    edgecolors='none'              # No borders
)

# 5. Add a Color Bar (Legend)
cbar = plt.colorbar(scatter)
cbar.set_label('Stellar Density (Stars per bin)', rotation=270, labelpad=20, color='Black')
cbar.ax.yaxis.set_tick_params(color='Black')
plt.setp(plt.getp(cbar.ax.axes, 'yticklabels'), color='Black')

# 6. Styling
plt.title("The Milky Way: Stellar Density Visualization", fontsize=18, color='Black' )
plt.xlabel("Right Ascension", fontsize=12, color='Black')
plt.ylabel("Declination", fontsize=12, color='Black')
plt.gca().invert_xaxis() # Astronomical standard
plt.grid(False)

plt.show()

Star Density Sky Map

2.2.3 Parallax Error vs Brightness

Code
print(">>> Executing Query 2.3: Parallax Error vs. Brightness...")

query_quality = """
    WITH QualityMetrics AS (
        SELECT 
            -- 1. Create Brightness Bins (0-21)
            -- FLOOR groups '10.2' and '10.9' into bin '10'
            FLOOR(phot_g_mean_mag) AS mag_bin,
            
            -- Pass through the raw data we need
            parallax_error,
            parallax
        FROM gaia_source
        WHERE parallax > 0  -- Filter out bad data to avoid division by zero
    )
    
    -- 2. Aggregation to find the Error Trend
    SELECT 
        mag_bin AS magnitude,
        
        -- A. Count how many stars are in this brightness range
        COUNT(*) AS star_count,
        
        -- B. Average Absolute Error (Raw uncertainty)
        ROUND(AVG(parallax_error), 4) AS avg_raw_error,
        
        -- C. Average Relative Error (The "Percentage" uncertainty)
        -- Formula: Error / Total Signal
        ROUND(AVG(parallax_error / parallax), 4) AS avg_relative_error
        
    FROM QualityMetrics
    WHERE mag_bin > 0 AND mag_bin < 22 -- Focus on the valid main range
    GROUP BY mag_bin
    ORDER BY mag_bin ASC -- Sort from Bright -> Dim
"""

# 3. Run and Show
quality_results = spark.sql(query_quality)
quality_results.show(25) # Show 25 rows to see the full range
>>> Executing Query 2.3: Parallax Error vs. Brightness...
+---------+----------+-------------+------------------+
|magnitude|star_count|avg_raw_error|avg_relative_error|
+---------+----------+-------------+------------------+
|        3|         1|       0.1317|            0.0438|
|        4|         1|       0.0749|            0.0146|
|        5|         7|       0.0586|             0.023|
|        6|        27|       0.0426|            0.0125|
|        7|        61|       0.0357|             0.013|
|        8|       197|       0.0401|            0.0207|
|        9|       488|       0.0263|             0.017|
|       10|      1304|       0.0264|            0.0229|
|       11|      3007|       0.0265|            0.0299|
|       12|      7054|       0.0246|            0.0487|
|       13|     15489|       0.0239|            0.0655|
|       14|     32688|       0.0297|            0.2688|
|       15|     66266|       0.0411|            1.2044|
|       16|    125846|       0.0633|            0.4907|
|       17|    223654|        0.105|            1.4188|
|       18|    368778|       0.1929|            4.8654|
+---------+----------+-------------+------------------+

2.2.3.1 Analysis

The objective of this query was to evaluate the steller technique’s depandibity at various star magnitudes. Before entering to the machine learning part, it is essetional to indentify the “Signal-to-Noice” ratios.

  • Binning Strategy: We grouped stars by their Apparent Magnitude (phot_g_mean_mag) into integer bins (e.g., Magnitude 10, 11, 12…).
  • For each bin we have clacualted:
    • Average Absolute Error AVG(parallax_error)
    • Average Relative Error AVG(parallax_error / parallax)
  • Statistics:
    • Bright stars - (Mag < 13 ) High photon counts result in high-precision centroids, leading to low parallax error.
    • Dim stars - (Mag > 13 )Lower signal-to-noise ratios should result in exponentially increasing errors.

2.2.3.2 Visualization

Code
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# 1. Convert Spark Result to Pandas
pdf_quality = quality_results.toPandas()

# 2. Setup the Plot (Dual Axis)
# We want to show Star Count (Bars) AND Error Rate (Line) on the same chart
fig, ax1 = plt.subplots(figsize=(12, 6))

# 3. Plot A: Star Count (The Histogram) on Left Axis
# This shows where most of our data lives
sns.barplot(
    data=pdf_quality, 
    x='magnitude', 
    y='star_count', 
    color='cornflowerblue', 
    alpha=0.3, 
    ax=ax1,
    label='Star Count'
)
ax1.set_ylabel('Number of Stars (Log Scale)', color='blue')
ax1.tick_params(axis='y', labelcolor='blue')
ax1.set_yscale('log') # Log scale because star counts vary wildly (1 to 300,000)

# 4. Plot B: The Parallax Error (AVG Error) on Right Axis
ax2 = ax1.twinx()

sns.lineplot(
    data=pdf_quality, 
    x=ax1.get_xticks(), # Align line with bars
    y='avg_raw_error',  # AVG(parallax_error)
    color='red', 
    marker='o', 
    linewidth=2,
    ax=ax2,
    label='Avg Parallax Error (mas)'
)
ax2.set_ylabel('Avg Parallax Error (milliarcseconds)', color='red')
ax2.tick_params(axis='y', labelcolor='red')

# 5. Titles and Layout
plt.title("Data Quality Audit (Error Rate vs. Brightness)", fontsize=14)
ax1.set_xlabel("Apparent Magnitude (Lower = Brighter)", fontsize=12)
plt.grid(True, linestyle=':', alpha=0.5)

plt.tight_layout()
plt.show()

Parallax Error vs Brightness
Code
spark.stop()

2.3 Jayrup (Exotic Star Hunting)

Finding rare and interesting stellar objects

2.3.1 White Dwarf Candidates

To find White Dwarfs, which are the hot, dense cores of dead stars. They are very hot but very dim. They are the remains of dead stars that have not yet completely cooled down, they start out bright-blue (top-right) and as they age, they turn dim-red (bottom-right). Scientists use this to estimate the age of the universe as their cooling down rate is fairly stable.

2.3.1.1 Importing Libraries

Code
import numpy as np
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import matplotlib.pyplot as plt
import matplotlib.colors as colors
from matplotlib.collections import LineCollection

# 1. Setup Spark (Use existing data)
spark = SparkSession.builder \
    .appName("WhiteDwarf_Hunter") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

2.3.1.2 Loading Data

Code
# data for the nearest 100 parsecs sources
df = spark.read.parquet("../data/gaia_100pc.parquet")
df.createOrReplaceTempView("gaia")

2.3.1.3 Color-Magnitude Method

We’re isolating white dwarfs within 100 parsecs using Gaia’s precise astrometry and photometry. White dwarfs occupy a distinct region in the HR diagram:

  • Faint but not too faint to avoid noisy detections (\(M_G\) \(10–15\))
  • Blue-to-yellow colors where the cooling sequence is densest (\(BP–RP < 1.0\))
  • Excluding outliers to remove rare hot subdwarfs (\(BP–RP ≥ -0.5\))

We calculate the Absolute magnitude \(M_G\) with

\[ M_G = m + 5 \log_{10}(parallax) - 10 \]

Code
# Find the White Dwarfs candidates using Color-Magnitude method
df_wd = spark.sql("""
    WITH candidates AS (
        SELECT *,
               phot_g_mean_mag + 5 * LOG10(parallax) - 10 AS absolute_magnitude
        FROM gaia
        WHERE parallax > 0
          AND bp_rp >= -0.5
          AND bp_rp < 1.0
          AND phot_g_mean_mag IS NOT NULL
          AND bp_rp IS NOT NULL
    )
    SELECT *,
           CASE
             WHEN absolute_magnitude > 10 AND absolute_magnitude < 15
             THEN 'White Dwarf'
             ELSE 'Main Sequence / Other'
           END AS type
    FROM candidates
""")

# Trigger computation and cache
df_wd.cache().count()

# Count WDs
wd_count = df_wd.filter(col("type") == "White Dwarf").count()
print(f">> FOUND: {wd_count} White Dwarfs using Color-Magnitude method.")
>> FOUND: 12580 White Dwarfs using Color-Magnitude method.
Code
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors
from pyspark.sql.functions import col

# === 1. OPTIMIZED DATA LOADING ===
# Only pull necessary columns + filter in Spark
wd_pandas = (
    df_wd.filter(col("type") == "White Dwarf")
    .select("bp_rp", "absolute_magnitude") 
    .filter(
        (col("bp_rp").between(-0.5, 1.5)) & 
        (col("absolute_magnitude").between(8, 16)) &
        col("bp_rp").isNotNull() &
        col("absolute_magnitude").isNotNull()
    )
    .toPandas()
)

spark.stop()

# === 2. VECTORIZED DENSITY CALCULATION ===
x = wd_pandas['bp_rp'].values
y = wd_pandas['absolute_magnitude'].values

# Precompute bin edges 
x_edges = np.linspace(-0.5, 1.5, 151)
y_edges = np.linspace(8, 16, 151)

# Compute histogram
H, _, _ = np.histogram2d(x, y, bins=[x_edges, y_edges])

# Digitize using precomputed edges
x_inds = np.digitize(x, x_edges[1:-1])
y_inds = np.digitize(y, y_edges[1:-1])

# Get densities directly from histogram array
z = H[x_inds, y_inds]

# === 3. SORTING OPTIMIZATION ===
# Use argsort on density (z) - but only if >10k points
if len(z) > 10000:
    idx = np.argpartition(z, -10000)[-10000:]  # Keep only top 10k densest points
    x_sorted, y_sorted, z_sorted = x[idx], y[idx], z[idx]
else:
    idx = z.argsort()
    x_sorted, y_sorted, z_sorted = x[idx], y[idx], z[idx]

# === 4. PLOTTING ===
plt.figure(dpi=120)

# Use scatter with pre-sorted points (densest on top)
sc = plt.scatter(
    x_sorted, y_sorted,
    c=z_sorted,
    s=1.5,
    cmap='gist_heat',
    norm=colors.LogNorm(vmin=1, vmax=np.max(H)),  # Precomputed vmax
    alpha=0.95,
    edgecolors='none',
    rasterized=True  # converts to bitmap for huge speedup
)

# === 5. ASTRONOMY POLISH ===
ax = plt.gca()
ax.invert_yaxis()

ax.set_title("Gaia DR3: White Dwarf Cooling Sequence", fontsize=14)
ax.set_xlabel("Gaia BP–RP colour", fontsize=12)
ax.set_ylabel("Gaia G absolute magnitude", fontsize=12)

# Colorbar with precomputed norm
cbar = plt.colorbar(sc, pad=0.02)
cbar.set_label('Stars per bin (log scale)', rotation=270, labelpad=20)

ax.grid(True, alpha=0.2, linewidth=0.5)
plt.tight_layout(pad=1.5)  # Faster than default layout

plt.show()
plt.close()  # Free memory immediately

2.3.1.4 What This Plot Reveals

This is the white dwarf cooling sequence the evolutionary path of dead stars in our cosmic neighborhood. Here’s what it tells us:

  1. The Diagonal Band
    • White dwarfs start hot and blue (top-left: \(BP-RP ≈ -0.3, M_G ≈ 10\))
    • They cool and redden over billions of years, moving down and right (bottom-right: \(BP-RP ≈ 1.0, M_G ≈ 15\))
  2. The Color Gradient
    • Bright orange/white regions: High density of white dwarfs (common evolutionary stages)
    • Dark red regions: Fewer white dwarfs (rare or short-lived phases)
  3. The Smooth Curve
    • This sequence is a stellar “fossil record”—it reveals how long white dwarfs have been cooling
    • The gap at top-left (BP-RP < -0.2) shows very hot white dwarfs are rare (they cool quickly)
    • The smooth curve confirms white dwarfs cool predictably—like cosmic thermometers
  4. The “Bifurcation”
    • The population is segregated : If you look closely at the diagonal band, it isn’t a single smear; it’s split into two distinct, parallel “ridges” or tracks. White Dwarfs are not a homogeneous group. This split typically represents a difference in atmospheric composition.
      • Track A (Top/Blue ridge): Likely stars with Hydrogen-rich atmospheres (DA white dwarfs). Hydrogen is lighter and more opaque, acting as a blanket that keeps heat in differently than Helium.
      • Track B (Bottom/Red ridge): Likely stars with Helium-rich atmospheres (DB white dwarfs).

Key insight: The densest part (bright orange) is where most white dwarfs “spend” their lives—proving they cool slowly over billions of years. This plot is why astronomers call white dwarfs “cosmic clocks” for measuring the age of our galaxy.

2.3.2 Red Giant Candidates

  • Goal: To find Red Giants, which are old, dying stars. They are very cool but very bright.
  • Columns Needed: teff_gspphot, parallax, phot_g_mean_mag
  • SQL Complexity: This is identical in structure to Query 3.1, but with inverted filters. This shows you understand how to apply domain knowledge.
    1. Subquery or CTE: Same as 3.1, create a view with absolute_magnitude.
    2. Complex Filtering: Select from this temporary table WHERE teff_gspphot < 4500 (cooler than 4,500 K) AND absolute_magnitude < 1 (brighter than 1).
  • Visualization: A Scatter Plot of teff_gspphot vs. absolute_magnitude for these candidates. [cite_start]They will form the “giant branch” at the top right of the H-R diagram[cite: 85].
Code
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors

# 1. Setup Spark 
spark = SparkSession.builder \
    .appName("RedGiant_DensityPlot") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Load only necessary columns
df = spark.read.parquet("../data/gaia_survey.parquet")


# create temp view
df.createOrReplaceTempView("gaia_source")

We use two seperate subqueries for the visualization. One to get the general idea and show the relative postion of the red-giants on the HR-daigram and the other to zoom it so we can analyze them better.

Code
df_all = spark.sql("""
    SELECT 
        bp_rp,
        phot_g_mean_mag + 5 * LOG10(parallax) - 10 AS absolute_magnitude
    FROM gaia_source
    WHERE parallax > 0
      AND parallax/parallax_error > 10
      AND bp_rp BETWEEN -0.5 AND 3.5
      -- AND (phot_g_mean_mag + 5 * LOG10(parallax) - 25) < 12
""")

df_rg = spark.sql("""
    SELECT 
        bp_rp,
        phot_g_mean_mag + 5 * LOG10(parallax) - 10 AS absolute_magnitude,
        parallax/parallax_error AS parallax_snr
    FROM gaia_source
    WHERE parallax > 0
      AND parallax/parallax_error > 10  -- Good parallax quality
      AND bp_rp BETWEEN 0.7 AND 2.5     -- Focus on RGB color range
      AND phot_g_mean_mag + 5 * LOG10(parallax) - 10 < 4.0  -- Bright giants
      AND bp_rp IS NOT NULL
      AND phot_g_mean_mag IS NOT NULL
""")

# Convert to pandas for plotting
all_pandas = df_all.toPandas()
rg_pandas = df_rg.toPandas()
spark.stop()
Code
# print no. of stars found
# print(f"Found {len(all_pandas)} stars in the survey.")

hb = plt.hexbin(
    x=all_pandas['bp_rp'], y=all_pandas['absolute_magnitude'],
    gridsize=500,              # High res = no blocky look
    extent=[-0.5, 3.5, -5, 12], # Fixed window for consistency
    norm=colors.LogNorm(),     # Essential: Compresses the dynamic range
    cmap='inferno',            # Perceptually uniform (Blue/Black -> Red -> Yellow)
    mincnt=1                   # Don't plot empty space
)

# Anotations
ax = plt.gca()
ax.invert_yaxis()

# A. The Main Sequence (High Density)
ax.text(1.6, 10.0, 'Main Sequence', color='purple', fontsize=10, 
        ha='right', rotation=-30)


# C. The True Evolutionary Path (Sub-Giant Branch)
# This arrow follows the curve, not a straight line
# Coordinates: Turn-off point -> Base of RGB
ax.annotate('', xy=(1.0, 0), xytext=(0.2,- 0.2),
            arrowprops=dict(arrowstyle='->', lw=2, color='cyan', connectionstyle="arc3,rad=-0.2"))
ax.text(0,0, 'Giant\nBranch', color='purple', fontsize=9, ha='center')

# Polish
ax.set_xlabel("Gaia BP–RP colour", fontsize=12)
ax.set_ylabel("Absolute Magnitude ($M_G$)", fontsize=12)

# Colorbar
cb = plt.colorbar(hb, pad=0.02)
cb.set_label('Star Density (Log Scale)', rotation=270, labelpad=20)

plt.tight_layout()
plt.show()

# print no. of stars found
# print(f"Found {len(rg_pandas)} stars in the survey.")

hb = plt.hexbin(
    x=rg_pandas['bp_rp'], y=rg_pandas['absolute_magnitude'],
    gridsize=500,              # High res = no blocky look
    # extent=[-0.5, 3.5, -5, 12], # Fixed window for consistency
    norm=colors.LogNorm(),     # Essential: Compresses the dynamic range
    cmap='inferno',            # Perceptually uniform (Blue/Black -> Red -> Yellow)
    mincnt=1                   # Don't plot empty space
)

# Anotations
ax = plt.gca()
ax.invert_yaxis()

# A. The Main Sequence (High Density)
# ax.text(1.6, 3, 'Main Sequence', color='white', fontsize=10, 
#         ha='right', rotation=-30)

plt.text(0.8, 3.5, 'Main Sequence', 
         color='white', fontsize=8, fontweight='bold',
         bbox=dict(facecolor='none', edgecolor='white', alpha=0.5))

# C. The True Evolutionary Path (Sub-Giant Branch)
# This arrow follows the curve, not a straight line
# Coordinates: Turn-off point -> Base of RGB
# ax.annotate('', xy=(1.0, 0), xytext=(0.2,- 0.2),
#             arrowprops=dict(arrowstyle='->', lw=2, color='cyan', connectionstyle="arc3,rad=-0.2"))
# ax.text(0,0, 'Giant\nBranch', color='purple', fontsize=9, ha='center')

# Polish
ax.set_xlabel("Gaia BP–RP colour", fontsize=12)
ax.set_ylabel("Absolute Magnitude ($M_G$)", fontsize=12)

# Colorbar
cb = plt.colorbar(hb, pad=0.02)
cb.set_label('Star Density (Log Scale)', rotation=270, labelpad=20)

plt.tight_layout()
plt.show()

Zoomed out

Zoomed in

Gaia DR3: Stellar Density Map

2.3.2.1 What This Plot Reveals

This is the Red Giant Branch (RGB), the evolutionary path of aging, low- to intermediate-mass stars as they exhaust hydrogen in their cores and begin shell burning. Here’s what it tells us:

  1. The Bright, Red Clump
    • Red giants are cool (\(BP–RP ≈ 1.0–2.5\)) but extremely luminous (\(M_G < 0\)), placing them in the upper-right of the HR diagram.
    • The densest region—the Red Clump (\(around M_G ≈ −1, BP–RP ≈ 1.3\))—marks stars stably burning helium in their cores. This acts as a “standard candle” for distance measurements.
  2. The Ascending Giant Branch
    • Stars move upward (brighter) and slightly redder as their outer envelopes expand after leaving the Main Sequence.
    • The smooth, curved track reflects predictable stellar evolution governed by mass and composition.
  3. Low-Mass vs. Upper RGB
    • At fainter magnitudes (M_G ≈ 2–4), we see lower-mass giants just starting their ascent.
    • The brightest, reddest stars (M_G < −1) are near the tip of the RGB, where helium ignition occurs in a dramatic flash for low-mass stars.
  4. Why It Matters
    • The RGB is a stellar aging sequence: its shape and density reveal the star formation history of our galactic neighborhood.
    • Because red giants are bright, they’re visible across vast distances—making them key tracers of galactic structure.

In short: this plot shows stars in their golden (actually red!) retirement, glowing brightly as they near the end of their lives—before shedding their outer layers to become white dwarfs.

2.3.3 Co-Moving Pair Search (Binary Candidates)

  • Goal: To find pairs of stars that are “traveling together” through space—they have similar distances and similar proper motions. This implies they are a binary star system.
  • Columns Needed: source_id, ra, dec, parallax, pmra, pmdec
  • SQL Complexity: This is the most complex query of all, as it requires a self-join:
    1. Self-Join: FROM gaia_df A JOIN gaia_df B ON A.source_id < B.source_id (the < prevents joining a star with itself and getting duplicate pairs).
    2. Spatial Binning (Join Optimization): To avoid an \(N \times N\) comparison, you must first bin the stars by position and join only on the same bin: ...ON A.ra_bin = B.ra_bin AND A.dec_bin = B.dec_bin AND A.source_id < B.source_id.
    3. Complex Filtering: WHERE ABS(A.parallax - B.parallax) < 1 (similar distance) AND ABS(A.pmra - B.pmra) < 5 (similar motion) AND ABS(A.pmdec - B.pmdec) < 5 (similar motion).
  • [cite_start]Visualization: A Table listing the pairs (A.source_id, B.source_id)[cite: 85].

Binary stars are gravitationally bound systems sharing common motion through space. We can detect them by finding pairs with:

  • Similar positions (angular proximity)
  • Similar distances (parallax values)
  • Similar proper motions (space velocity vectors)
Code
import pyspark.sql.functions as F
# initialize spark session
spark = SparkSession.builder \
    .appName("Co-Moving_PairSearch") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Load your 100pc data
df = spark.read.parquet("../data/gaia_100pc.parquet")

# Add coarse sky bins (1° × 1°) - simple but effective
df = df.withColumn("ra_bin", F.floor(F.col("ra") / 1)) \
       .withColumn("dec_bin", F.floor(F.col("dec") / 1))
df.createOrReplaceTempView("stars")
Code
pairs = spark.sql("""
    SELECT A.source_id as id1, B.source_id as id2,
           A.ra, A.dec, A.parallax, 
           A.pmra, A.pmdec,
           B.ra as ra2, B.dec as dec2, B.parallax as plx2,
           B.pmra as pmra2, B.pmdec as pmdec2
    FROM stars A JOIN stars B
      ON A.ra_bin = B.ra_bin AND A.dec_bin = B.dec_bin
      AND A.source_id < B.source_id
    WHERE ABS(A.parallax - B.parallax) < 1      -- Same distance (within 1 mas)
      AND ABS(A.pmra - B.pmra) < 1              -- Same motion
      AND ABS(A.pmdec - B.pmdec) < 1
""")
Code
# Distance to each star (parsecs)
pairs = pairs.withColumn("dist_pc", 1000 / F.col("parallax"))

# Angular separation (degrees) and physical separation (AU)
pairs = pairs.withColumn(
    "ang_sep_deg", 
    F.degrees(F.acos(F.sin(F.radians("dec")) * F.sin(F.radians("dec2")) + 
                    F.cos(F.radians("dec")) * F.cos(F.radians("dec2")) * 
                    F.cos(F.radians("ra") - F.radians("ra2"))))
).withColumn("sep_au", F.col("ang_sep_deg") * 3600 * F.col("dist_pc"))

# Keep only likely physical pairs
binaries = pairs.filter(F.col("sep_au") < 10000)
print(f">> Found: {binaries.count()} candidate binaries")
[Stage 3:>                                                        (0 + 14) / 15][Stage 3:=======>                                                 (2 + 13) / 15][Stage 3:=================================================>       (13 + 2) / 15]
>> Found: 5129 candidate binaries
                                                                                
Code
# Get photometry for plotting (join back to original data)
binaries_with_phot = binaries.alias("p").join(
    df.select("source_id", "bp_rp", "phot_g_mean_mag").alias("phot"),
    F.col("p.id1") == F.col("phot.source_id")
).join(
    df.select("source_id", "bp_rp", "phot_g_mean_mag").alias("phot2"),
    F.col("p.id2") == F.col("phot2.source_id")
).select(
    "p.*", 
    F.col("phot.bp_rp").alias("color1"),
    F.col("phot.phot_g_mean_mag").alias("mag1"),
    F.col("phot2.bp_rp").alias("color2"),
    F.col("phot2.phot_g_mean_mag").alias("mag2")
)

# Calculate absolute magnitudes

plot_df = binaries_with_phot.toPandas()
plot_df['abs_mag1'] = plot_df['mag1'] + 5*np.log10(plot_df['parallax']) - 10
plot_df['abs_mag2'] = plot_df['mag2'] + 5*np.log10(plot_df['plx2']) - 10
[Stage 7:====================================================>      (8 + 1) / 9][Stage 10:>                                                       (0 + 14) / 15][Stage 10:===>                                                    (1 + 14) / 15][Stage 10:==================================>                      (9 + 6) / 15]                                                                                
Code
# Analyze the distribution
print("\n=== Binary Separation Distribution ===")
print(f"Median separation: {plot_df['sep_au'].median():.1f} AU")
print(f"Mean separation: {plot_df['sep_au'].mean():.1f} AU")
print(f"Closest pair: {plot_df['sep_au'].min():.1f} AU")
print(f"Widest pair: {plot_df['sep_au'].max():.1f} AU")

# Classification by separation
plot_df['binary_type'] = np.select([
    plot_df['sep_au'] < 100,
    plot_df['sep_au'] < 1000,
    plot_df['sep_au'] < 10000
], ['Close', 'Intermediate', 'Wide'], 'Very Wide')

=== Binary Separation Distribution ===
Median separation: 1685.8 AU
Mean separation: 2738.5 AU
Closest pair: 31.2 AU
Widest pair: 9994.3 AU
Code
# Plot: Sample 100 pairs for clarity
sample = plot_df.sample(n=min(100, len(plot_df)))


plt.gca().invert_yaxis()
plt.xlabel("Gaia BP-RP Color")
plt.ylabel("Absolute Magnitude ($M_G$)")

# Draw connecting lines
lines = [[(r['color1'], r['abs_mag1']), (r['color2'], r['abs_mag2'])] 
         for _, r in sample.iterrows()]
lc = LineCollection(lines, colors='gray', alpha=0.3, linewidths=0.5)
plt.gca().add_collection(lc)

# Plot the stars
plt.scatter(sample['color1'], sample['abs_mag1'], s=30, c='skyblue', label='Star A')
plt.scatter(sample['color2'], sample['abs_mag2'], s=30, c='orange', label='Star B')

plt.legend()
plt.tight_layout()
plt.show()

# histogram
plt.hist(plot_df['sep_au'], bins=50, log=True, color='steelblue', alpha=0.7)
plt.axvline(100, color='red', linestyle='--', label='Close (<100 AU)')
plt.axvline(1000, color='orange', linestyle='--', label='Intermediate')

# set labels

plt.xlabel("Physical Separation (AU)")
plt.ylabel("Count (log scale)")
plt.legend()
plt.grid(alpha=0.3)

plt.tight_layout()
plt.show()

Binary Systems on HR Diagram

Binary Separation Distribution

Binary stars within 100 parsecs